package tcp

import (
	"bufio"
	"encoding/hex"
	"fmt"
	"gitee.com/sansaniot/ssiot-gw-lib-comm/comm/common"
	"gitee.com/sansaniot/ssiot-gw-lib-comm/comm/types"
	"gitee.com/sansaniot/ssiot-gw-lib-comm/common/global"
	"gitee.com/sansaniot/ssiot-gw-lib-comm/model"
	"github.com/rs/zerolog/log"
	"net"
	"strings"
	"time"
)

var (
	// 连接对象
	connections = new(types.ConnPool)
	// 监听对象
	listeners = new(types.ListenerPool)
)

func getConnLocalPort(conn net.Conn) string {
	return strings.Split(conn.LocalAddr().String(), ":")[1]
}

func getListenerLocalPort(listener net.Listener) string {
	// 比如 [::]:18899
	addr := strings.Split(listener.Addr().String(), ":")
	return addr[len(addr)-1]
}

// goroutine 从对应的通道中接收命令并通过连接发送
func listenAndSendCommand(commId string) {
	log.Info().Msgf("listen command channel for commId %s", commId)

	for {
		//如果监听对象已经不存在，则协程结束
		if _, ok := listeners.Load(commId); !ok {
			log.Info().Msgf("stopped to listen command channel for commId %s", commId)
			return
		}

		commandChannelAny, _ := types.ProbeCommandChannels.Load(commId)
		commandChannel := commandChannelAny.(chan *types.ProbeCommand)
		command := <-commandChannel

		//取出对应网关的连接
		conn, ok := connections.Load(command.CommId)
		if !ok {
			log.Debug().Msgf("no conns when sending command for %s", commId)
			common.EndModbusProbe(commId)
			continue
		}

		_, err := conn.(net.Conn).Write(command.Command)
		if err != nil {
			log.Debug().Msgf("sending command error for commId %s: %s", commId, err)
			continue
		}

		log.Debug().Msgf("sending command for commId %s: %x", commId, command.Command)
	}
}

// goroutine 处理单个连接，一个连接对应一个dtu，dtu下有1个或多个设备
func receiveData(conn net.Conn, commId string) {
	port := getConnLocalPort(conn)
	// 读取数据失败后销毁监听，因此如果平台改了端口，需要dtu断开连接后，才会重新监听新端口
	// 连接中断后，关闭连接，设备下线，停止采集
	defer destroyNetConn(commId)
	defer common.EndModbusProbe(commId)
	// 连接中断则设备下线
	defer common.Offline(commId)

	for {
		//对于已没有进行监听的端口的连接，在收到下一次数据后关闭
		if _, ok := listeners.Load(commId); !ok {
			log.Info().Msgf("close conn for commId %s on port %s for no listener", commId, port)
			return
		}

		// 从TCP流读取数据，长度随意，由DPU库来解析应用层协议
		reader := bufio.NewReader(conn)
		var buf [512]byte
		n, err := reader.Read(buf[:])

		// 如果不对timeout错误做判断，timeout后，设备下线并停止采集，除非设备重连，否则无法再次开始采集
		if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
			log.Info().Msgf("read data from client timeout for commId %s on port %s, continue", commId, port)
			continue
		}

		// 连接关闭
		if err != nil {
			log.Info().Msgf("read data from client failed so close conn for commId %s on port %s, %s", commId, port, err)
			break
		}

		revStr := string(buf[:n])
		data := []byte(revStr)

		// 发送数据到DPU
		if common.OnReceiveData != nil {
			common.OnReceiveData(commId, data)
		}

		hexData := hex.EncodeToString(data)
		log.Debug().Msgf("receiving data of port %s commId %s: %s", port, commId, hexData)
	}
}

//销毁连接
func destroyNetConn(commId string) {
	_, ok := connections.Load(commId)
	if !ok {
		return
	}

	conn, _ := connections.Load(commId)
	var err error
	err = nil

	if conn != nil {
		err = conn.Close()
	}

	if err == nil {
		connections.Delete(commId)
		log.Info().Msgf("commId %s destroyed tcp conn:%v", commId, conn)
	} else {
		time.Sleep(3 * time.Second)
		conn.Close()
		connections.Delete(commId)
		log.Info().Msgf("commId %s destroyed tcp again conn:%v", commId, conn)
	}
}

func acceptConn(commId string, listener net.Listener) {
	var port string

	for {
		conn, err := listener.Accept()
		if err != nil {
			time.Sleep(1 * time.Second)
			log.Info().Msgf("commId %s accept connection failed as listener closed, %s", commId, err)
			//当Accept失败时，结束协程，如果continue，当listener已经close时，会一直报错
			break
		}
		port = getConnLocalPort(conn)
		log.Info().Msgf("commId %s on port %s accepted a connection", commId, port)

		// 新的连接上来，如果老连接没中断，不建立连接
		// 如果在服务端识别到的老的连接中断前，新的连接已连接上来，又不被接受，会造成一段时间的数据中断
		// 但这样是合理的，否则新的连接上来就用新的连接，会导致不安全
		if _, exist := connections.Load(commId); exist {
			log.Info().Msgf("commId %s conn on port %s exist, will not accept new before it is broken", commId, port)
			continue
		}

		// 将连接放入连接池
		connections.Store(commId, conn)
		// 连接上的设备都上线
		common.Online(commId)
		//获得连接后接收数据
		go receiveData(conn, commId)
		//获得连接后开始采集
		common.StartModbusProbe(commId)
	}
}

func createListener(commId string) bool {
	//监听端口
	listen, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%s", commId))
	if err != nil {
		log.Info().Msgf("listen failed, %s:", err)
		return false
	}

	listeners.Store(commId, listen)
	log.Info().Msgf("start listening on port %s", commId)
	return true
}

func destroyListener(gateway string) {
	listener, ok := listeners.Load(gateway)
	if !ok {
		return
	}

	err := listener.Close()
	if err == nil {
		listeners.Delete(gateway)
		log.Info().Msgf("commId %s destroyed listener", gateway)
	}
}

func startProbes(commIds model.CommIds) {
	//对平台已配置的，但没有监听的端口，开始监听。 无dpu配置也需要监听，否则设备不能上线，无法同步配置
	for commId, _ := range commIds {
		//如果端口为0，会监听任意端口
		if commId == "0" {
			continue
		}

		//平台已配置，且已监听，则不作后续处理，除非端口有变化，则需要关闭监听对象和连接，后续重新用新端口打开
		if listener, ok := listeners.Load(commId); ok {
			currentListenPort := getListenerLocalPort(listener)
			if currentListenPort != commId {
				log.Info().Msgf("tcp port changed from %s to %s", currentListenPort, commId)
				destroyListener(commId)
				destroyNetConn(commId)
			}
			continue
		}

		//对每个网关创建监听对象
		if true == createListener(commId) {
			//初始化接收采集数据的通道
			common.InitDataChannel(commId)

			//初始化接收采集命令的通道
			common.InitCommandChannel(commId)
			//监听下发命令通道
			go listenAndSendCommand(commId)

			//每个端口一个协程用于接收连接
			listener, _ := listeners.Load(commId)
			go acceptConn(commId, listener)
		}
	}
}

func endProbes(commIds model.CommIds) {
	listeners.Range(func(commId string, listener net.Listener) bool {
		//对已监听的，但平台没有配置的端口停止监听、停止采集
		if _, ok := commIds[commId]; !ok {
			common.EndModbusProbe(commId)
			destroyListener(commId)
			destroyNetConn(commId)
		}
		return true
	})
}

func ManageProbes(...interface{}) {
	device := &model.Device{CommType: global.Tcp}
	commIds, result := device.FindCommIdByType()

	if result.Error != nil {
		log.Info().Msgf("sql query currently unavailable: %s", result.Error)
		return
	}

	startProbes(commIds)
	endProbes(commIds)
}
